<?php
namespace app\api\controller;

use Stomp\Client;
use Stomp\Network\Observer\Exception\HeartbeatException;
use Stomp\Network\Observer\ServerAliveObserver;
use Stomp\StatefulStomp;

use think\Log;
use app\admin\model\devmgmt\Device;
use app\admin\model\datamanagement\Thinfo;

class Amqpgetmsg {

    private function print($msg)
    {
        LOG::write("[Amqpgetmsg] $msg", "info");
    }
    private function log($msg)
    {
        LOG::write("[Amqpgetmsg] $msg", "info");
    }
    private function start_consume() 
    {
        
        //参数说明，请参见AMQP客户端接入说明文档。
        $accessKey = 'LTAI5tBheH5B2R8nysPysjgL';
        $accessSecret = 'sHCk7efgZrqnMWr1VWPNyd7XoLUZam';
        $consumerGroupId = "xH1KPIjZ6twtNZtOeChR000100";
        //iotInstanceId：购买的实例请填写实例ID，公共实例请填空字符串""。
        $iotInstanceId = "";
        $timeStamp = round(microtime(true) * 1000);
        //签名方法：支持hmacmd5，hmacsha1和hmacsha256。
        $signMethod = "hmacsha1";
        // 此处clientID 为自定义
        $clientId = "461af9ec-32d0-4c6f-b05b-dhter";
        //userName组装方法，请参见AMQP客户端接入说明文档。
        //若使用二进制传输，则userName需要添加encode=base64参数，服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
        $userName = $clientId . "|authMode=aksign"
            . ",signMethod=" . $signMethod
            . ",timestamp=" . $timeStamp
            . ",authId=" . $accessKey
            . ",iotInstanceId=" . $iotInstanceId
            . ",consumerGroupId=" . $consumerGroupId
            . "|";
        $signContent = "authId=" . $accessKey . "&timestamp=" . $timeStamp;
        //计算签名，password组装方法，请参见AMQP客户端接入说明文档。
        $password = base64_encode(hash_hmac("sha1", $signContent, $accessSecret, $raw_output = TRUE));
        //接入域名，请参见AMQP客户端接入说明文档。下方 123456 替换为你的阿里云账号id， cn-shanghai 替换为你的地区代码  若是PHP开发，端口号是  61614
        $client = new Client('ssl://281486323202144879.iot-amqp.cn-shanghai.aliyuncs.com:61614');
        $sslContext = ['ssl' => ['verify_peer' => true, 'verify_peer_name' => false]];
        $client->getConnection()->setContext($sslContext);

        //服务端心跳监听。
        $observer = new ServerAliveObserver();
        $client->getConnection()->getObservers()->addObserver($observer);
        //心跳设置，需要云端每10s发送一次心跳包。
        $client->setHeartbeat(0, 10000);
        $client->setLogin($userName, $password);
        try {
            $client->connect();
        } catch (StompException $e) {
            echo "failed to connect to server, msg:" . $e->getMessage(), PHP_EOL;
        }
        //无异常时继续执行。
        $stomp = new StatefulStomp($client);
        $stomp->subscribe('/topic/#');
        return $stomp;
    }
    /*
    qos:1
    destination:/as/mqtt/status/a1DLzb56jJu/test666
    message-id:1399330831021037706
    topic:/as/mqtt/status/a1DLzb56jJu/test666
    subscription:7756741791155551660
    generateTime:1622461443082

    {
        "lastTime":"2021-05-31 19:44:03.069",
        "iotId":"aBk1zvpgDnHBwEmL7P9O000000",
        "utcLastTime":"2021-05-31T11:44:03.069Z",
        "clientIp":"112.4.64.162",
        "utcTime":"2021-05-31T11:44:03.082Z",
        "time":"2021-05-31 19:44:03.082",
        "productKey":"a1DLzb56jJu",
        "deviceName":"test666",
        "status":"online"
    }
    */
    private function update_device_state($msg)
    {
        $ret = 0;
        //从'{"lastTime"'开始截取
        $json_str = substr($msg,strripos($msg,'{"lastTime"'));
        //移除最后一个回车符，防止json_decode报错
        $json_str = substr($json_str, 0, strlen($json_str)-1);
        $json_obj = json_decode($json_str);
        
        if(!strcmp("online", $json_obj->status)){
            $state = 0;
        }
        else{
            $state = 1;
        }
        $this->log('Update the deviceName ['.$json_obj->deviceName.'] status to ['.$json_obj->status.']');

        $ret = (new Device())
            ->where('devicename', '=', $json_obj->deviceName)
            ->update(['state' => $state, 'updatetime' => time()]);

        return $ret;
    }
    
/*
qos:1                                               
destination:/a1oz5VdCSZk/DHTER00002/user/update     
message-id:1456643124779631104                      
topic:/a1oz5VdCSZk/DHTER00002/user/update           
subscription:5401539722651994546                    
generateTime:1636125758725                          
                                               
{"DeviceName":"XXXXX","CurrentTemperature":21.8,"CurrentHumidity":71.0}                                               
*/
    private function add_thinfo($msg)
    {
        $ret = 0;
        //从'{"lastTime"'开始截取
        $json_str = substr($msg,strripos($msg,'{"DeviceName"'));
        //移除最后一个回车符，防止json_decode报错
        $json_str = substr($json_str, 0, strlen($json_str)-1);

        $json_obj = json_decode($json_str);

        $this->print('DeviceName:'.$json_obj->DeviceName.', CurrentTemperature:'.$json_obj->CurrentTemperature.', CurrentHumidity:'.$json_obj->CurrentHumidity);
        /* */
        $data = [
            'devId' => $json_obj->DeviceName, 
            'temp' => $json_obj->CurrentTemperature, 
            'humi' => $json_obj->CurrentHumidity, 
            'timeStamp' => date('Y-m-d H:i:s',time())
        ];

        $ret = (new Thinfo())->insert($data);
	$this->print('DeviceName:'.$json_obj->DeviceName.', rssi:'.$json_obj->rssi);
        $ret = (new Device())
	            ->where('devicename', '=', $json_obj->DeviceName)
	            ->update(['signalstrength' => $json_obj->rssi, 'updatetime' => time()]);


        return $ret;
    }
    
    private function get_topic($msg)
    {
        /*获取Topic采用比较原始的方式，关键字+字符串截取，后续如果有更好方式可以优化*/
        $result = substr($msg, strripos($msg,"topic"));
        $result = substr($result,0, strripos($result,"subscription"));
        return $result;
    }

    private function msg_parse($msg)
    {
        LOG::write("Msg from Ali IOT ".$msg, 'info');
        $topic = $this->get_topic($msg);

        if(strstr($topic,"/as/mqtt/status/"))
        {
            $this->update_device_state($msg);
        }
        else if(strstr($topic,"/user/update"))
        {
            $this->add_thinfo($msg);
        }
        else{
            LOG::write("[LJF] Invalid topic:".$topic);
        }
    }
    public function index() {
        LOG::write("Amqpgetmsg start...", 'info');
        $stomp = $this->start_consume();
        while (true) {
            if ($stomp == null || !$stomp->getClient()->isConnected()) {
                LOG::write("connection not exists, will reconnect after 10s.".PHP_EOL, 'error');
                sleep(10);
                $stomp = $this->start_consume();
            }

            try {
                //处理消息业务逻辑。
                $msg = $stomp->read();

                //判断获取空数据不进行处理
                if(strlen($msg) > 1)
                {
                    $this->msg_parse($msg);
                }

            } catch (HeartbeatException $e) {
                LOG::write('The server failed to send us heartbeats within the defined interval.'.PHP_EOL, 'error');
                $stomp->getClient()->disconnect();
            } catch (Exception $e) {
                LOG::write('process message occurs error ' . $e->getMessage().PHP_EOL, 'error');
            }
        }
    }
};
